Solaris Batch Export: A utility to batch export full bitdepth images from the PerkinElmer Solaris imaging system

Copyright (C) 2017 Ethan LaRochelle - Thayer School of Engineering at Dartmouth College
ethan.phillip.m.larochelle.th@dartmouth.edu
14 Engineering Drive, Hanover, NH 03755

This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with this program. If not, see http://www.gnu.org/licenses/.


In [ ]:
# Install third-party packages

# Read/write files and directories
import os
# Numeric Python
import numpy
# Read JSON file format
import json
# Install imagaing packages
import skimage
from skimage import io
# Ignore warnings so they won't be displayed
import warnings
warnings.filterwarnings('ignore')

To do

  • Generally clean up code
  • Consolidate 'Group' and 'No Group' code segments
  • Consolidate code to process 'Unmixed' and regular snapshots
  • Make easier for end-user (more interactive)
    • Either use config file or user-input (CLI or web form)
      • Input/output directory
      • Image size
      • Search term
  • Batch process video files

Modify section below


In [ ]:
## MODIFY HERE ##
# Define input and output directories
cur_experiment_dir = 'Mouse_Experiment'
input_root_dir = 'D:\\\\SolarisData\\Research\\'
output_root_dir = 'D:\\\\ExperimentData\\Research\\'

# If testing, write_files can be set to False
# This will be slightly faster becasue it does not 
# write to disk
write_files = True

# The code assumes all image files have the 
# search_term in the file name
search_term = 'Snapshot'

# The Solaris allows three different image sizes. 
# We generally always use 1024x1024
height = 1024
width = 1024

Constants


In [ ]:
input_dir = os.path.join(input_root_dir, cur_experiment_dir)
output_dir = os.path.join(output_root_dir, cur_experiment_dir)
if not os.path.isdir(output_dir):    
    os.mkdir(output_dir)

# The following generally stays the same
# Group file is used to store names of experiments, but it is not always used
groups_file = os.path.join(input_dir, 'groups.svd')

# Metadata files specify which channels were used for imaging
# This dictionary is used to conver the channel number to 
# a readable format used in the file naming
channels = {
    '1': '470',
    '2': '660',
    '3': '750',
    '4': '800',
    '5': 'ChannelError'
}
# The file extensions indicate which type of file
# This dictionary is used in the file naming
image_types = {
    'ssr': 'RGB',
    'ssa': 'Monochrome',
    'ssm': 'Side-by-Side'
}
# In an advanced mode the user can acquire images using a 
# Liquid Crystal Tunable Filter
# In this mode an image is acquired with the following emission filters
# Traget, Tissue, and Food are computed by the unmixing algorithm on the system
LCTF_channels = ['520',
                '530',
                '540',
                '550',
                '560',
                '570',
                '580',
                '590',
                '600',
                '610',
                '620',
                'Target',
                'Tissue',
                'Food']

In [ ]:
# Open and read the data in the group file
# This may be empty (If it is empty use the 'No Groups' code below)
use_group_meta = False
if os.path.isfile(groups_file):
    with open(groups_file) as data_file:    
        study_data = json.load(data_file)
        if study_data!=[]:
            use_group_meta = True

Main function to read image files


In [ ]:
# This is the main function to read the image files in a directory
def read_solaris_image_set(directory, file_name, lctf_channel=False):
    # Read snapshot metadata
    if lctf_channel:
        # LCTF channels store the metadata in the parent directory
        # The '..' is Unix notation to move up a directory
        snapshot_metadata = os.path.join(directory, '..', 'metadata.svd')
    else:
        snapshot_metadata = os.path.join(directory, 'metadata.svd')
        
    with open(snapshot_metadata) as metadata_file:    
        snapshot_metadata = json.load(metadata_file)
    # Using the data from the metadata file in the snapshot directory
    # We can extract extra information about the type of image
    current_channel_num = str(snapshot_metadata['Channel'])
    current_channel = channels[current_channel_num]
    snaphot_name = snapshot_metadata['DataName']
    
    # Construct file name of image file
    current_full_file = os.path.join(directory, file_name)
    # Find the image file extension
    field_name = file_name.split('.')[1]
    
    # Store all the image information in a single dictionary
    image_info = {
        'channel_num': current_channel_num,
        'channel_name': current_channel,
        'snapshot_name': snaphot_name,
        'field_name': field_name
    }
    # Print debug information about current file
    print('Reading: {}\n\t{}'.format(current_full_file, image_info))

    # Read image file(s) as long as they are not the side-by-side images
    if field_name != 'ssm':
        with open(current_full_file,'rb') as file:
            if field_name=='ssr':
                # 8-bit color image
                byte_array = numpy.fromfile(current_full_file, dtype='uint8')
            else:
                # 16-bit monochrome image
                # - ssa is fluorescent image
                # - ssm is dummy image to place ssr and ssa next to each other
                byte_array = numpy.fromfile(current_full_file, dtype='uint16')

            # Calculate width from length of byte array    
            width = int(numpy.size(byte_array)/height)

            # Reconstruct image from array
            if field_name=='ssr':
                # Color image (R G B)
                reconstructed_im = numpy.reshape(byte_array, [height, height, 3])
            else:
                # Monochrome 16-bit image
                reconstructed_im = numpy.reshape(byte_array, [height, height])
                # Flip fluorescent image (up-down)
                reconstructed_im = numpy.flipud(reconstructed_im)
                # Rotate image -90 degrees
                reconstructed_im = numpy.rot90(reconstructed_im,-1)
        return [reconstructed_im, image_info]

Group file


In [ ]:
# If the group file is used, we want to 
# include this in the output file names
def read_all_file_with_group(study_data, input_dir, output_dir, channels=channels, image_types=image_types, LCTF_channels=LCTF_channels):
    # Create a new dictionary to store the image data
    solaris_images = {}
    # Create an empty list to store the directories 
    # that will need to be processed
    solaris_dirs = []

    # The group file will indicate the names of the experiments, so we loop through all of these
    for group in study_data:
        # Find the name of the group
        group_name = group['Name']
        # Create a sub-dictionary for the group
        solaris_images[group_name] = {}
        # Print the group name for debug
        print('{}'.format(group_name))
        # Within each group/etxperiment there can be multiple subjects/timepoints
        for time_point in group['SubjectNames']:
            # Create a sub-dictionary for the timepoint
            solaris_images[group_name][time_point] = {}
            print('\t{}'.format(time_point))
            # Construct the full directory name
            timepoint_dir = os.path.join(input_dir, time_point)
            # Find all the snapshot directories within this time point
            # Each time point can have multiple images which are all stored
            # in their own directories
            snapshot_dirs = os.listdir(timepoint_dir)
            # Loop through each directory in the list
            for snapshot_dir in snapshot_dirs:
                # Verify the directory has the search_term i.e. "Snapshot" in it's name
                if search_term in snapshot_dir:
                    # Add empty sub-dictionary for snapshot
                    solaris_images[group_name][time_point][snapshot_dir] = {}

                    # Using the LCTF, the software can perform spectral unmixing
                    # If that is the case, there will be multiple emission wavelengths
                    if 'Unmixed' in snapshot_dir:
                        channel_dirs = os.listdir(os.path.join(timepoint_dir,snapshot_dir))
                        # Loop through each emission wavelength present in the current directory
                        for each_channel in channel_dirs:
                            # Verify directory name matches valid LCTF channels
                            if each_channel in LCTF_channels:
                                # Create empty sub-dictionary for each emission channel
                                solaris_images[group_name][time_point][snapshot_dir][each_channel] = {}
                                # Construct the full directory name
                                full_snapshot_dir = os.path.join(input_dir, time_point, snapshot_dir, each_channel)
                                # Find all files in the directory
                                snapshot_files = os.listdir(full_snapshot_dir)
                                # Limit to only files with search term i.e. 'Snapshot'
                                file_matches = [s for s in snapshot_files if search_term in s]
                                #print(file_matches)
                                for image_file in file_matches:
                                    # Process as long it is not a side-by-side image
                                    if '.ssm' not in image_file:
                                        [reconstructed_im, image_info] = read_solaris_image_set(full_snapshot_dir, image_file, True)
                                    #print(numpy.shape(reconstructed_im))
                                    #print(image_info)
                                    if write_files:
                                        # Construct output file name
                                        output_filename = '{}_{}_{}_LCTF{}_{}'.format(group_name,
                                                                                      time_point,
                                                                                      image_types[image_info['field_name']],
                                                                                      each_channel,
                                                                                      image_info['snapshot_name'])
                                        # Remove unsafe characters in file name
                                        safe_filename = "".join([c for c in output_filename if c.isalpha() or c.isdigit() or c==' ' or c=='_']).rstrip()
                                        #print('\t\t{}'.format(safe_filename))
                                        # Save as .TIF or .PNG file
                                        skimage.io.imsave( os.path.join(output_dir, '{}.tif'.format(safe_filename)), reconstructed_im)

                                    # Store image array in dictionary 
                                    solaris_images[group_name][time_point][snapshot_dir][each_channel][image_types[image_info['field_name']]] = reconstructed_im


                    # If not a spectrally unmixed image set    
                    else:

                        # Construct the directory name
                        full_snapshot_dir = os.path.join(input_dir, time_point, snapshot_dir)
                        #print(full_snapshot_dir)
                        # Return list of all files in directory
                        snapshot_files = os.listdir(full_snapshot_dir)
                        # Find files in directory that contain the search term i.e. 'Snapshot'
                        file_matches = [s for s in snapshot_files if search_term in s]
                        #print(file_matches)
                        # Loop through all the matches
                        for image_file in file_matches:
                            # Process as long it is not a side-by-side image
                            if '.ssm' not in image_file:
                                reconstructed_im, image_info = read_solaris_image_set(full_snapshot_dir, image_file)

                            if write_files:
                                # Construct output file name
                                output_filename = '{}_{}_{}_{}_{}'.format(group_name,
                                                                              time_point,
                                                                              image_types[image_info['field_name']],
                                                                              image_info['channel_name'],
                                                                              image_info['snapshot_name'])
                                # Remove unsafe characters in file name
                                safe_filename = "".join([c for c in output_filename if c.isalpha() or c.isdigit() or c==' ' or c=='_']).rstrip()
                                #print('\t\t{}'.format(safe_filename))
                                # Save as .TIF or .PNG file
                                skimage.io.imsave( os.path.join(output_dir, '{}.tif'.format(safe_filename)), reconstructed_im)

                            # Store image array in dictionary 
                            solaris_images[group_name][time_point][snapshot_dir][image_types[image_info['field_name']]] = reconstructed_im
    return solaris_images

No Groups


In [ ]:
# If the group file is NOT used, 
# we can read the image data, but process
# is a little different
def read_all_file_without_group(input_dir, output_dir, channels=channels, image_types=image_types, LCTF_channels=LCTF_channels):

    # Create a new dictionary to store the image data
    solaris_images = {}
    # Create an empty list to store the directories 
    # that will need to be processed
    solaris_dirs = []

    # Find all the directories listed in the current input directory
    all_timepoints = os.listdir(input_dir)
    # Within each group/etxperiment there can be multiple subjects/timepoints
    # Loop through each sub-directory
    for time_point in all_timepoints:
        print('\t{}'.format(time_point))
        # Create a sub-dictionary for the timepoint
        solaris_images[time_point] = {}
        # Construct full sub-directory name for current timepoint
        timepoint_dir = os.path.join(input_dir, time_point)
        # Verify it is a directory and not a file
        if os.path.isdir(timepoint_dir):
            # Find all sub-directories within the current timepoint
            snapshot_dirs = os.listdir(timepoint_dir)
            for snapshot_dir in snapshot_dirs:
                # Verify the search term .i.e. 'Snapshot' is found in the file name
                if search_term in snapshot_dir:
                    # Add empty sub-dictionary for snapshot
                    solaris_images[time_point][snapshot_dir] = {}

                    # Using the LCTF, the software can perform spectral unmixing
                    # If that is the case, there will be multiple emission wavelengths
                    if 'Unmixed' in snapshot_dir:
                        channel_dirs = os.listdir(os.path.join(timepoint_dir,snapshot_dir))
                        # Loop through each emission wavelength present in the current directory
                        for each_channel in channel_dirs:
                            if each_channel in LCTF_channels:
                                # Create empty sub-dictionary for each emission channel
                                solaris_images[time_point][snapshot_dir][each_channel] = {}
                                # Construct the full directory name
                                full_snapshot_dir = os.path.join(input_dir, time_point, snapshot_dir, each_channel)
                                # Find all files in the directory
                                snapshot_files = os.listdir(full_snapshot_dir)
                                # Limit to only files with search term i.e. 'Snapshot'
                                file_matches = [s for s in snapshot_files if search_term in s]
                                #print(file_matches)
                                for image_file in file_matches:
                                    # Process as long it is not a side-by-side image
                                    if '.ssm' not in image_file:
                                        [reconstructed_im, image_info] = read_solaris_image_set(full_snapshot_dir, image_file, True)
                                    #print(numpy.shape(reconstructed_im))
                                    #print(image_info)
                                    if write_files:
                                        # Construct output file name
                                        output_filename = '{}_{}_LCTF{}_{}'.format(time_point,
                                                                                      image_types[image_info['field_name']],
                                                                                      each_channel,
                                                                                      image_info['snapshot_name'])
                                        # Remove unsafe characters in file name
                                        safe_filename = "".join([c for c in output_filename if c.isalpha() or c.isdigit() or c==' ' or c=='_']).rstrip()
                                        #print('\t\t{}'.format(safe_filename))
                                        # Save as .TIF or .PNG file
                                        skimage.io.imsave( os.path.join(output_dir, '{}.tif'.format(safe_filename)), reconstructed_im)

                                    # Store image array in dictionary 
                                    solaris_images[time_point][snapshot_dir][each_channel][image_types[image_info['field_name']]] = reconstructed_im

                    # If not a spectrally unmixed image set 
                    else:
                        # Construct the directory name
                        full_snapshot_dir = os.path.join(input_dir, time_point, snapshot_dir)
                        #print(full_snapshot_dir)
                        # Return list of all files in directory
                        snapshot_files = os.listdir(full_snapshot_dir)
                        # Find files in directory that contain the search term i.e. 'Snapshot'
                        file_matches = [s for s in snapshot_files if search_term in s]
                        #print(file_matches)
                        # Loop through all the matches
                        for image_file in file_matches:
                            # Process as long it is not a side-by-side image
                            if '.ssm' not in image_file:
                                reconstructed_im, image_info = read_solaris_image_set(full_snapshot_dir, image_file)

                            if write_files:
                                # Construct output file name
                                output_filename = '{}_{}_{}_{}'.format(time_point,
                                                                          image_types[image_info['field_name']],
                                                                          image_info['channel_name'],
                                                                          image_info['snapshot_name'])
                                # Remove unsafe characters in file name
                                safe_filename = "".join([c for c in output_filename if c.isalpha() or c.isdigit() or c==' ' or c=='_']).rstrip()
                                #print('\t\t{}'.format(safe_filename))
                                # Save as .TIF or .PNG file
                                skimage.io.imsave( os.path.join(output_dir, '{}.tif'.format(safe_filename)), reconstructed_im)

                            # Store image array in dictionary 
                            solaris_images[time_point][snapshot_dir][image_types[image_info['field_name']]] = reconstructed_im
    return solaris_images

In [ ]:
if use_group_meta:
    output_images = read_all_file_with_group(study_data, input_dir, output_dir)
else:
    output_images = read_all_file_without_group(input_dir, output_dir)

In [ ]: